我們在上一章的時候,Flink 的輸入 source 是使用 Kafka,它很好用,但我們常常面對的卻是各種 OLTP 資料庫。而如果你打開 Flink 官網,會看到在 DataStream Connectors 底下有 JDBC 的存在:
但是當你仔細一看,糟了個糕,裡面只有支援 sink 卻沒有 source。
這是因為 OLTP 架構並不是設計給 streaming 這種情境使用的,你唯一能做的只有透過輪詢的方式,不停的去要最新的資料來模擬成 streaming source。
所以,我們要來客製化一個 source了。
public class CustomPostgresSource extends RichParallelSourceFunction<String> {
private final String jdbcUrl;
private final String username;
private final String password;
private final long pollingInterval;
private transient Connection connection;
private transient PreparedStatement preparedStatement;
private transient ValueState<Long> lastQueryTimeState;
public CustomPostgresSource(String jdbcUrl, String username, String password, long pollingInterval) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.pollingInterval = pollingInterval;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = DriverManager.getConnection(jdbcUrl, username, password);
preparedStatement = connection.prepareStatement("SELECT * FROM example_data WHERE last_modified > ?");
lastQueryTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryTime", Long.class));
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (true) {
long lastQueryTime = lastQueryTimeState.value() != null ? lastQueryTimeState.value() : 0L;
preparedStatement.setTimestamp(1, new Timestamp(lastQueryTime));
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
// Emit data to downstream
String data = resultSet.getString("some_column");
ctx.collect(data);
}
// Update the last query time
long currentTimestamp = System.currentTimeMillis();
lastQueryTimeState.update(currentTimestamp);
// Sleep for pollingInterval milliseconds before the next query
Thread.sleep(pollingInterval);
}
}
@Override
public void cancel() {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
// Handle exception
}
}
}
這是一個相對簡單的客製化 source function ,我們自己連線到 postgres 的表格,並每 n 秒查詢一次。同時 SQL 有一個 last_modified
欄位可以做為 where 條件,減少我們查詢的資料量,同時確保跟之前的資料沒有重覆。
這個方案可以提供你一個假 JDBC streaming source,但要注意以下幾點:
last_modified
有正確被更新,並且有 update 到 ValueState
內,同時在 open
時也有被載入回來。這是為了保證 checkpoint/savepoint 能記錄到這個值,並在重啟時能還原到正確的值。cancel
時要將 connection 關閉,以免 MemoryLeak以上,你就能自己簡單做出一個自己的 JDBC source 了。
官方案例中,Table API 倒是有支援 JDBC 的 source。但我在研究的時候卡了不少的關,而且我並不喜歡 Table API,他強制要求開發者要使用他的 SQL 語法,而且 SQL 跟原本 Streaming 的觀念落差較大。儘管官方認為這樣可以將 batch 跟 streaming 的語法統一,但可能等之後再來深入研究吧。
讀者如果願意,也可以試試看使用 Table API 做為 JDBC 的 source 解法。